9d47a52c531458434dd522aff05bee870fd84915,cdap-watchdog/src/test/java/co/cask/cdap/logging/save/LogSaverTest.java,LogSaverTest,startLogSaver,#,107

Before Change



  @BeforeClass
  public static void startLogSaver() throws Exception {
    dsFramework = new InMemoryDatasetFramework(new InMemoryDefinitionRegistryFactory());
    dsFramework.addModule(Id.DatasetModule.from(Constants.SYSTEM_NAMESPACE_ID, "table"),
                          new InMemoryTableModule());

    String logBaseDir = temporaryFolder.newFolder().getAbsolutePath();
    LOG.info("Log base dir {}", logBaseDir);

After Change



    Configuration hConf = HBaseConfiguration.create();

    injector = Guice.createInjector(
      new ConfigModule(cConf, hConf),
      new ZKClientModule(),
      new KafkaClientModule(),
      new LocationRuntimeModule().getInMemoryModules(),
      new TransactionModules().getInMemoryModules(),
      new AbstractModule() {
        @Override
        protected void configure() {
          bind(DatasetFramework.class)
            .toInstance(new InMemoryDatasetFramework(new InMemoryDefinitionRegistryFactory()));
        }
      });

    txManager = injector.getInstance(TransactionManager.class);
    txManager.startAndWait();

    DatasetFramework dsFramework = injector.getInstance(DatasetFramework.class);
    dsFramework.addModule(Id.DatasetModule.from(Constants.SYSTEM_NAMESPACE_ID, "table"),
                          new InMemoryTableModule());

    ZKClientService zkClientService = injector.getInstance(ZKClientService.class);
    zkClientService.startAndWait();

    KafkaClientService kafkaClient = injector.getInstance(KafkaClientService.class);
    kafkaClient.startAndWait();

    LogSaver logSaver = injector.getInstance(LogSaver.class);
    logSaver.startAndWait();

    MultiLeaderElection multiElection = new MultiLeaderElection(zkClientService, "log-saver", 2, logSaver);
    multiElection.setLeaderElectionSleepMs(1);
    multiElection.startAndWait();

    // Sleep a while to let Kafka server fully initialized.
    TimeUnit.SECONDS.sleep(5);

    publishLogs();

    LocationFactory locationFactory = injector.getInstance(LocationFactory.class);
    Location logBaseDir = locationFactory.create(cConf.get(LoggingConfiguration.LOG_BASE_DIR));

    waitTillLogSaverDone(logBaseDir, "ACCT_1/APP_1/flow-FLOW_1/%s", "Test log message 59 arg1 arg2");